CREATE schema if not exists sensor;
create role sensordatain connection limit 10 password '';
create role sensordataread connection limit 10 password '';

drop table sensor.datain;
drop table sensor.data_storage;
drop table sensor.last_data;
drop table sensor.cfg_topic;
drop table sensor.topic_list;
drop table sensor.topic_def;
create table if not exists sensor.datain (timestamp bigint not null,
	topic varchar(256),
	value bigint not null);
create table if not exists sensor.data_storage (timestamp bigint not null,
	topicid bigint not null,
	value bigint not null);
create table if not exists sensor.last_data (topicid bigint not null,
	value bigint not null);
create table if not exists sensor.cfg_topic (feld varchar(256),
	pos smallint not null,
	minlength smallint );
create table if not exists sensor.topic_list (topicid bigint not null primary key,
	topic varchar(256) not null unique);
create table if not exists sensor.topic_def (topicid bigint not null,
	feld varchar(256),
	inhalt varchar(256));

insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);

grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
grant insert,select,update on sensor.last_data to akosensordatain with grant option;
grant insert,select on sensor.topic_list to akosensordatain with grant option;
grant insert,select on sensor.topic_def to akosensordatain with grant option;
grant execute on all procedures in schema sensor to akosensordatain with grant option;
grant select,insert,update on sensor.data_storage to sensor with grant option;
grant insert,select,update on sensor.last_data to sensor with grant option;
grant insert,select on sensor.topic_list to sensor with grant option;
grant insert,select on sensor.topic_def to sensor with grant option;
grant execute on all procedures in schema sensor to sensor with grant option;
grant connect on database sensor to sensor;
grant select on akosensor.data_storage to sensordataread with grant option;

CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
language plpgsql
as $$
 declare
   feldname varchar(1024);
   tpos int;
   feldinhalt varchar(64);
   bcfgDone INT;
   cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
begin
 OPEN cfg_curs;
 loop
  fetch cfg_curs into feldname,tpos;
  exit when not found;

  if (tpos < 0) then
   feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
  else
   feldinhalt := split_part(vtopic,'/',tpos);
  end if;
  if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then  
   insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
  end if;
 end loop;
 close cfg_curs;
end
$$;


CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
language plpgsql
as $$
declare
  crctopic bigint;
BEGIN
  if intopic is null then
    raise 'missing Topic' using message = 'Topic is missing';
  end if;
  crctopic=crc32(intopic);
  if invalue is null then
    raise 'missing value' using message = 'Value is missing';
  end if;
  if tstamp is null then
    raise 'missing time' using message = 'Time is missing';
  end if;
  
    if (crctopic in (select topicid from sensor.topic_list))=false then
      call sensor.insert_topic(intopic,crctopic);
      insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
      insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
      insert into sensor.last_data (topicid,value) values (crctopic,invalue);
    else
      if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
	insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
        update sensor.last_data set value=invalue where topicid=crctopic;
      else
	if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
          update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
        else
          insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
        end if;
      end if; 
    end if; 
END
$$;


CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
DECLARE
    tmp bigint;
    i int;
    j int;
    byte_length int;
    binary_string bytea;
BEGIN
    IF text_string = '' THEN
        RETURN 0;
    END IF;

    i = 0;
    tmp = 4294967295;
    byte_length = bit_length(text_string) / 8;
    binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
    LOOP
        tmp = (tmp # get_byte(binary_string, i))::bigint;
        i = i + 1;
        j = 0;
        LOOP
            tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
            j = j + 1;
            IF j >= 8 THEN
                EXIT;
            END IF;
        END LOOP;
        IF i >= byte_length THEN
            EXIT;
        END IF;
    END LOOP;
    RETURN (tmp # 4294967295);
END
$$ IMMUTABLE LANGUAGE plpgsql;
